package com.starsky.common.rabbitmq.service;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

@Slf4j
public abstract class ReadMQHandler {

    /**
     * 业务处理
     *
     * @param json：业务数据
     */
    public abstract void handler(String json);

    /**
     * 消息确认机制，默认是自动确认，消费者接收到，自动清除，这里使用手动确认
     * 手动确认方式：
     * basic.ack用于肯定确认，消费成功
     * basic.nack用于否定确认（注意：这是AMQP 0-9-1的RabbitMQ扩展）
     * basic.reject用于否定确认，但与basic.nack相比有一个限制:一次只能拒绝单条消息
     *
     * @param channel
     * @param deliveryTag
     * @param json
     */
    public void readMq(Channel channel, Long deliveryTag, String json) {

        //手动ACK
        //默认情况下如果一个消息被消费者所正确接收则会被从队列中移除
        //如果一个队列没被任何消费者订阅，那么这个队列中的消息会被 Cache（缓存），
        //当有消费者订阅时则会立即发送，当消息被消费者正确接收时，就会被从队列中移除,这样以后就不会再发了
//        否则消息服务器以为这条消息没处理掉 后续还会在发，true确认所有消费者获得的消息
        log.info("[消息内容：{}]", json);
        boolean flag = true;
        try {
            //业务处理
            handler(json);
            /**
             * basicAck(deliveryTag, true): 该消息的index
             * 第一个参数依然是当前消息到的数据的唯一id;
             * 第二参数是否批量.true:将一次性拒绝所有小于deliveryTag的消息;false表示通知 RabbitMQ 当前消息被确认.
             * requeue：被拒绝的是否重新入队列
             */
            channel.basicAck(deliveryTag, true);
            flag = false;
            log.info("消息消费成功：messageId：{}", deliveryTag);
        } catch (Exception e) {
            log.info("[ 处理消息队列内容异常，messageId：{}，异常内容： ]", deliveryTag, e);
            try {
                /**
                 * channel.basicReject(deliveryTag, true);  拒绝消费当前消息，
                 * 第一个参数依然是当前消息到的数据的唯一id;
                 * 第二参数传入true，就是将数据重新丢回队列里，那么下次还会消费这消息。
                 * 设置false，就是告诉服务器，我已经知道这条消息数据了，因为一些原因拒绝它，而且服务器也把这个消息丢掉就行。
                 * 下次不想再消费这条消息了。
                 */
                channel.basicReject(deliveryTag, false);
            } catch (IOException e1) {
                e1.printStackTrace();
                log.error("[ 丢弃消息异常，messageId：{}，异常内容： ]", deliveryTag, e);
            }
        } finally {
            if (flag) {
                //丢弃这条消息
                try {
                    /**
                     * channel.basicNack(deliveryTag, false, true);
                     * 第一个参数依然是当前消息到的数据的唯一id;
                     * 第二个参数是指是否针对多条消息；如果是true，也就是说一次性针对当前通道的消息的tagID小于当前这条消息的，都拒绝确认。
                     * 第三个参数是指是否重新入列，也就是指不确认的消息是否重新丢回到队列里面去。
                     */
                    channel.basicNack(deliveryTag, false, true);
                    //拒绝消息
                    //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
                    //消息被丢失
                    //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                    //消息被重新发送
                    //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
                    //多条消息被重新发送
                    //channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
                } catch (IOException e1) {
                    e1.printStackTrace();
                    log.error("消息消费失败:", e1.getMessage());
                }
                log.info("消息消费失败,重回队列：id：{}", deliveryTag);
            }
        }
    }

    public static void main(String[] args) {
        String json = "";
//        new ReadMQHandler() {
//            @Override
//            public void handler(String json) {
//                System.out.println("json = " + json);
//            }
//        }.readMq(channel,deliveryTag,json);

    }
}
